SQS FIFO キューで処理に失敗したときの実行順序を調べてみた
西田@大阪です
今回はSQS FIFO キューで処理に失敗した場合に実行順序がどうなるのかを調べてみました
SQS の FIFO とは
SQS の FIFOキューについては以下の記事を参照ください
標準キューと違って以下の点が特徴です
- キューの重複がおこりません
- キューの順序が保証されます
疑問
SQS FIFO のドキュメントから抜粋です
複数回の再試行
■ コンシューマーが失敗した SendMessage アクションを検出した場合、同じメッセージ重複排除 ID を使用して必要に応じて何回でも送信を再試行できます。重複排除の期間が終了する前にプロデューサーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響したり、重複が発生したりすることはありません。
■ コンシューマーが失敗した ReceiveMessage アクションを検出した場合、同じ受信リクエスト試行 ID を使用して必要に応じて何回でも再試行できます。可視性タイムアウトが終了する前にコンシューマーが少なくとも 1 つの確認を受信した場合、複数の再試行を行ってもメッセージの順序に影響はありません。
■ メッセージグループ ID を含むメッセージを受信した場合、メッセージを削除するか、メッセージが表示されるまで、同じメッセージグループ ID のメッセージはそれ以上返されません。
ドキュメントからは以下のことは保証されていそうです
- 再試行されてもューの順序は守られる
- 再試行されてもキューの重複はおこらない
ただ、以下のケースについてはうまく読み取れませんでした
- 複数キューが失敗した場合は再試行される順序は保証されるのか?
- 複数サブスクライバーがいた場合、失敗したキューが Dead Letter Queueにいくまですべてのコンシューマーはブロックされるのか?
先に答えから
先に答えから書きます
- 失敗しても順番は 保証されます
- 失敗したキューが Dead Letter Queue に行くまで すべてのサブスクライバーはブロックされます
※ VisibilityTimeout
を超えてキューが処理が続行してしまう場合は順番は保証されず、複数回実行される点をご注意ください
試してみた
以下の設定で Dead Letter Queue に行くSQS FIFOキューを作成します
- VisibilityTimeout が 3秒
- 2回失敗
AWSTemplateFormatVersion: "2010-09-09" Description: "SQS FIFO Template" Resources: CreateSetuppackDeadLetterQueue: Type: AWS::SQS::Queue Properties: QueueName: sqs-sample-dl.fifo FifoQueue: true CreateSetuppackQueue: Type: AWS::SQS::Queue Properties: QueueName: sqs-sample.fifo FifoQueue: true ContentBasedDeduplication: false RedrivePolicy: deadLetterTargetArn: Fn::GetAtt: - CreateSetuppackDeadLetterQueue - Arn maxReceiveCount: 2 VisibilityTimeout: 3
パブリッシャー側のコードです
def enqueue(n=1): for i in range(n): sqs.send_message( QueueUrl=SQS_URL, MessageBody=f"{i}", MessageGroupId="test", MessageDeduplicationId=str(uuid.uuid4()) )
渡された引数の数値の数だけ "1", "2" ... と数字(str)をエンキューしていきます
サブスクライバー側のコードです。
def dequeue(): print(threading.get_ident()) while True: response = sqs.receive_message( QueueUrl=SQS_URL, MaxNumberOfMessages=1 ) if 'Messages' not in response: time.sleep(0.1) continue for message in response['Messages']: if message['Body'] == '15': print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']} failed") time.sleep(5) else: print(f"{threading.get_ident()} {datetime.datetime.now()} {message['Body']}") handle = message['ReceiptHandle'] try: sqs.delete_message( QueueUrl=SQS_URL, ReceiptHandle=handle ) except ClientError as e: print(e) enqueue(20) with ThreadPoolExecutor(max_workers=3) as executor: executor.submit(dequeue) executor.submit(dequeue) executor.submit(dequeue)
メッセージを受信しメッセージの内容が'15'
であれば VisibilityTimeout
より長めにスリープしています
実行結果です
スレッドID, 実行時間, 処理したメッセージの内容 の順に表示されています
123145397649408 123145414438912 123145431228416 123145397649408 2020-01-31 08:20:25.727753 0 123145397649408 2020-01-31 08:20:25.794476 1 123145397649408 2020-01-31 08:20:25.865409 2 123145397649408 2020-01-31 08:20:25.930104 3 123145431228416 2020-01-31 08:20:25.993035 4 123145431228416 2020-01-31 08:20:26.057589 5 123145431228416 2020-01-31 08:20:26.133130 6 123145397649408 2020-01-31 08:20:26.173065 7 123145397649408 2020-01-31 08:20:26.237981 8 123145397649408 2020-01-31 08:20:26.303023 9 123145414438912 2020-01-31 08:20:26.342946 10 123145414438912 2020-01-31 08:20:26.397174 11 123145414438912 2020-01-31 08:20:26.462444 12 123145414438912 2020-01-31 08:20:26.527355 13 123145414438912 2020-01-31 08:20:26.588985 14 123145414438912 2020-01-31 08:20:26.653587 15 failed 123145397649408 2020-01-31 08:20:29.719290 15 failed 123145414438912 2020-01-31 08:20:32.746732 16 123145414438912 2020-01-31 08:20:32.801750 17 123145414438912 2020-01-31 08:20:32.867375 18 123145414438912 2020-01-31 08:20:32.934917 19
0から順に処理され 15
で2回失敗する間すべてのサブスクライバーが止まってることが確認できると思います
最後に
この記事が誰かの参考になれば幸いです